[AWS IoT] MQTT QoS=1による再送について確認してみました
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT Coreで、MQTTをQoS=1(受信側も含む)でpublishした場合、適切に処理しないと、Coreから延々と再送されてしまいます。
下記は、その様子を再現してみたものです。
左側のウインドウで動作しているのは、トピック(test)をSubscribeして、メッセージの到着を待機しているプログラムです。
右側は、トピック(test)に対して、1回だけメッセージを送信して終了するプログラムです。
1回しか、送信していないのに、同じ内容の受信が止まりません。プログラムを再起動しても、それは続きます。
下に見えているのは、AWS IoTコンソールの「テスト」で、トピック(test)を確認しているものですが、こちらには、1回しか表示されていなので、一見すると「何処から誰が送っているのか?」と見えてもしまいます。
※ 2021/10/07 KeepAliveに関する記述を一部変更させていただいております。
2 QoS
MQTTのQoSは3つのレベルがありますが、AWS IoT Coreでは、このうち、0及び、1が利用可能です。
Qos | 説明 | AWS SDK |
---|---|---|
0 | At most once | mqtt.QoS.AT_MOST_ONCE |
1 | At least once | mqtt.QoS.AT_LEAST_ONCE |
2 | Exactly once | - |
QoS=1では、メッセージを受け取った際に返されるPUBACKを受け取れるまでCoreから再送が続きます。
AWS Summit Tokyo 2018 資料 AWS IoT の賢い利用の仕方とプログラミングの勘所より
3 コード
最初の動画で使用したコードは、以下のとおりです。
mqtt.pyは、pulish側及び、subscribe側の両方で共通的に使用しているMQTTクラスです。 qosは、mqtt.QoS.AT_LEAST_ONCEとなっています。
mqtt.py
from awscrt import io, mqtt from awsiot import mqtt_connection_builder class Mqtt(): def __init__(self, client_id): endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" root_ca = "./root-CA.crt" cert = "./cert.pem" key = "./private.key" event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) self.__connection = mqtt_connection_builder.mtls_from_path( endpoint = endpoint, cert_filepath = cert, pri_key_filepath = key, client_bootstrap = client_bootstrap, ca_filepath = root_ca, client_id = client_id, clean_session = False, keep_alive_secs = 30) self.__connection.connect() def subscribe(self, topic, callback): self.__connection.subscribe( topic = topic, #qos = mqtt.QoS.AT_MOST_ONCE, qos = mqtt.QoS.AT_LEAST_ONCE, callback = callback) def publish(self, topic, payload): self.__connection.publish( topic = topic, payload = payload, #qos = mqtt.QoS.AT_MOST_ONCE) qos = mqtt.QoS.AT_LEAST_ONCE)
送信側のプログラムです。1回だけメッセージをpublishして終了しています。
publisher.py
# -*- coding: utf-8 -*- import time from mqtt import Mqtt topic = "test" mqtt = Mqtt("publisher") mqtt.publish(topic, '{"message":"hello"}') time.sleep(2) # 送信完了まで数秒待機
受信側のプログラムは、subscribeの後、永久ループで待機します。メッセージ到着時(callback)は、重たい処理を想定して、15秒待機しています。
subscriber.py
# -*- coding: utf-8 -*- import time from mqtt import Mqtt def callback(topic, payload): print("receive: {}".format(payload)) for i in range(15): print("sleep({})".format(i)) time.sleep(1) print("finish.") topic = "test" mqtt = Mqtt("subscriber") mqtt.subscribe(topic, callback) while(True): time.sleep(1)
4 PUBACK
PUBACKを返す様子をWiresharkで確認してみました。
赤枠が、PUBACKですが、上は3秒後に返され、下は、7秒後に返されていることが、確認できます。
この違いは、受信したタイミングで処理されるCallbackの処理時間で変化しています。
def callback(topic, payload): print("receive: {}".format(payload)) # for i in range(3): # <= 上は、3秒 for i in range(7): # <= 上は、7秒 print("sleep({})".format(i)) time.sleep(1)
最初に、試したプログラムでは、ここが15秒となっていたので、一応、PUBACKは、返しているが、タイムアウトで「返信なし」と判断されています。
ここで言えるのは、QoS=1で処理する場合、callbackは、処理時間を意識する必要があるということです。
どうしても重たい処理となる場合は、一例として、下記のように、別スレッドで処理すれば良いかも知れません。
def func(payload): print("receive: {}".format(payload)) for i in range(15): print("sleep({})".format(i)) time.sleep(1) print("finish.") def callback(topic, payload): thread1 = threading.Thread(target=func, args=(payload,)) thread1.start()
また、mqtt_connection_builder.mtls_from_path() で指定する、keep_alive_secsでの調整も可能かも知れません。
5 最後に
QoS=1を使用すれば、「確実な送達」というだけでなく、デバイス(受信側)が起動していないタイミングでも、先にメッセージを送っておいて、「起動した時に処理させる」というような事も可能です。